 1.任务调度系统Airflow之任务集成部署中Airflow核心概念
   
   DAGs：有向无环图(Directed Acyclic Graph)，将所有需要运行的tasks按照依赖
关系组织起来，描述的是所有tasks执行的顺序；
   Operators：Airflow内置了很多operators
       BashOperator 执行一个bash 命令
       PythonOperator 调用任意的 Python 函数
       EmailOperator 用于发送邮件
       HTTPOperator 用于发送HTTP请求
       SqlOperator 用于执行SQL命令
       自定义Operator
   Tasks：Task 是 Operator的一个实例；
   Task Instance：由于Task会被重复调度，每次task的运行就是不同的 Task instance。
Task instance 有自己的状态，包括success 、running 、failed 、skipped 、
up_for_reschedule 、up_for_retry 、queued 、no_status 等；
   Task Relationships：DAGs中的不同Tasks之间可以有依赖关系；
   执行器（Executor）。Airflow支持的执行器就有四种：
       SequentialExecutor：单进程顺序执行任务，默认执行器，通常只用于测试
       LocalExecutor：多进程本地执行任务
       CeleryExecutor：分布式调度，生产常用。Celery是一个分布式调度框架，其本身
无队列功能，需要使用第三方组件，如RabbitMQ
       DaskExecutor ：动态任务调度，主要用于数据分析
       执行器的修改。修改$AIRFLOW_HOME/airflow.cfg 第70行:executor= LocalExecutor。
修改后启动服务

 2.入门案例
   
   放置在 $AIRFLOW_HOME/dags 目录下

from datetime import datetime, timedelta
from airflow import DAG
from airflow.utils import dates
from airflow.utils.helpers import chain
from airflow.operators.bash_operator import BashOperator
from airflow.operators.python_operator import PythonOperator

def default_options():
   default_args = {
        'owner':'airflow',                  # 拥有者名称
        'start_date': dates.days_ago(1),    # 第一次开始执行的时间
        'retries': 1,                       # 失败重试次数
        'retry_delay': timedelta(seconds=5) # 失败重试间隔
   }
   return default_args

# 定义DAG
def task1(dag):
    t = "pwd"
    # operator支持多种类型，这里使用 BashOperator
    task = BashOperator(
	  task_id='MyTask1', # task_id
      bash_command=t,    # 指定要执行的命令
      dag=dag            # 指定归属的dag
    )
    return task

def hello_world():
    current_time = str(datetime.today())
    print('hello world at {}'.format(current_time))

def task2(dag):
    # Python Operator
    task = PythonOperator(
        task_id='MyTask2',
        python_callable=hello_world,    # 指定要执行的函数
        dag=dag)
    return task

def task3(dag):
    t = "date"
    task = BashOperator(
        task_id='MyTask3',
        bash_command=t,
        dag=dag)
    return task

with DAG(
        'HelloWorldDag',                # dag_id
        default_args=default_options(), # 指定默认参数
        schedule_interval="*/2 * * * *" # 执行周期，每分钟2次
) as d:
     task1 = task1(d)
     task2 = task2(d)
     task3 = task3(d)
     chain(task1, task2, task3)         # 指定执行顺序
   
   # 执行命令检查脚本是否有错误。如果命令行没有报错，就表示没问题
   python $AIRFLOW_HOME/dags/helloworld.py
   # 查看生效的 dags
   airflow list_dags -sd $AIRFLOW_HOME/dags
   # 查看指定dag中的task
   airflow list_tasks HelloWorldDag
   # 测试dag中的task
   airflow test HelloWorldDag MyTask2 20200801
 
 3.核心交易调度任务集成
   
   核心交易分析
   # 加载ODS数据（DataX迁移数据）
   /data/lagoudw/script/trade/ods_load_trade.sh
   # 加载DIM层数据
   /data/lagoudw/script/trade/dim_load_product_cat.sh
   /data/lagoudw/script/trade/dim_load_shop_org.sh
   /data/lagoudw/script/trade/dim_load_payment.sh
   /data/lagoudw/script/trade/dim_load_product_info.sh
   # 加载DWD层数据
   /data/lagoudw/script/trade/dwd_load_trade_orders.sh
   # 加载DWS层数据
   /data/lagoudw/script/trade/dws_load_trade_orders.sh
   # 加载ADS层数据
   /data/lagoudw/script/trade/ads_load_trade_order_analysis.sh
   
   备注： depends_on_past ，设置为True时，上一次调度成功了，才可以触发。
   $AIRFLOW_HOME/dags
   
from datetime import timedelta
import datetime
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.utils.dates import days_ago

# 定义dag的缺省参数
default_args = {
	
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': '2020-06-20',
    'email': ['airflow@example.com'],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
}

# 定义DAG
coretradedag = DAG(
    'coretrade',
    default_args=default_args,
    description='core trade analyze',
    schedule_interval='30 0 * * *',
)

today=datetime.date.today()
oneday=timedelta(days=1)
yesterday=(today-oneday).strftime("%Y-%m-%d")

odstask = BashOperator(
    task_id='ods_load_data',
    depends_on_past=False,
    bash_command='sh /data/lagoudw/script/trade/ods_load_trade.sh '
+ yesterday,
    dag=coretradedag
)

dimtask1 = BashOperator(
    task_id='dimtask_product_cat',
    depends_on_past=False,
    bash_command='sh /data/lagoudw/script/trade/dim_load_product_cat.sh '
+
yesterday,
    dag=coretradedag
)

dimtask2 = BashOperator(
    task_id='dimtask_shop_org',
    depends_on_past=False,
    bash_command='sh /data/lagoudw/script/trade/dim_load_shop_org.sh '
+ yesterday,
    dag=coretradedag
)

dimtask3 = BashOperator(
    task_id='dimtask_payment',
    depends_on_past=False,
    bash_command='sh /data/lagoudw/script/trade/dim_load_payment.sh '
+ yesterday,
    dag=coretradedag
)

dimtask4 = BashOperator(
    task_id='dimtask_product_info',
    depends_on_past=False,
    bash_command='sh /data/lagoudw/script/trade/dim_load_product_info.sh '
+
yesterday,
    dag=coretradedag
)

dwdtask = BashOperator(
    task_id='dwd_load_data',
    depends_on_past=False,
    bash_command='sh /data/lagoudw/script/trade/dwd_load_trade_orders.sh '
+
yesterday,
    dag=coretradedag
)

dwstask = BashOperator(
    task_id='dws_load_data',
    depends_on_past=False,
    bash_command='sh /data/lagoudw/script/trade/dws_load_trade_orders.sh ' 
+
yesterday,
   dag=coretradedag
)

adstask = BashOperator(
    task_id='ads_load_data',
    depends_on_past=False,
	bash_command='sh /data/lagoudw/script/trade/ads_load_trade_order_analysis.sh '
+ yesterday,
    dag=coretradedag
)

odstask >> dimtask1
odstask >> dimtask2
odstask >> dimtask3
odstask >> dimtask4
odstask >> dwdtask

dimtask1 >> dwstask
dimtask2 >> dwstask
dimtask3 >> dwstask
dimtask4 >> dwstask
dwdtask >> dwstask

dwstask >> adstask


airflow list_dags
airflow list_tasks coretrade --tree

